package com.flow.framework.mq.system.listener.lifecycle;

import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.common.util.io.IoUtil;
import com.flow.framework.common.util.verify.VerifyUtil;
import com.flow.framework.core.pojo.dto.base.notify.BaseNotifyDto;
import com.flow.framework.core.properties.FrameworkCoreConfigProperties;
import com.flow.framework.facade.mq.module.service.IMqFrameworkModuleService;
import com.flow.framework.mq.constant.FrameworkMqConstant;
import com.flow.framework.mq.consumer.IConsumer;
import com.flow.framework.mq.consumer.listener.MessageQueueConsumerListener;
import com.flow.framework.mq.pojo.bo.QueueBo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.InitializingBean;

import java.util.*;

/**
 * 框架消息队列配置
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/3/26
 */
@Slf4j
public class MessageQueueLifecycleListener implements InitializingBean {

    private final List<IConsumer<? extends BaseNotifyDto>> consumers;

    private final ConnectionFactory connectionFactory;

    private final RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    private final RabbitListenerContainerFactory<?> factory;

    private final FrameworkCoreConfigProperties frameworkCoreConfigProperties;

    private final IMqFrameworkModuleService mqFrameworkModuleService;

    /**
     * 初始化消息队列消费者监听器
     *
     * @param consumers                      消费者
     * @param connectionFactory              连接工厂
     * @param rabbitListenerEndpointRegistry rabbit端点注册服务
     * @param factory                        rabbit连接工厂
     * @param frameworkCoreConfigProperties  框架核心配置
     * @param mqFrameworkModuleService       框架消息队列模块调用服务
     */
    public MessageQueueLifecycleListener(List<IConsumer<? extends BaseNotifyDto>> consumers, ConnectionFactory connectionFactory, RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry,
                                         RabbitListenerContainerFactory<?> factory, FrameworkCoreConfigProperties frameworkCoreConfigProperties,
                                         IMqFrameworkModuleService mqFrameworkModuleService) {
        this.consumers = consumers;
        this.connectionFactory = connectionFactory;
        this.rabbitListenerEndpointRegistry = rabbitListenerEndpointRegistry;
        this.factory = factory;
        this.frameworkCoreConfigProperties = frameworkCoreConfigProperties;
        this.mqFrameworkModuleService = mqFrameworkModuleService;
    }

    /**
     * @inheritDoc
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        if (VerifyUtil.isEmpty(consumers)) {
            return;
        }
        if (VerifyUtil.hasEmpty(connectionFactory, rabbitListenerEndpointRegistry, factory)) {
            log.error("mq config error.");
            throw new CheckedException(SystemErrorCode.MQ_ERROR, "mq config error");
        }
        Map<String, IConsumer<? extends BaseNotifyDto>> routingKeyAndConsumerMap = new HashMap<>(16);
        Connection connection = null;
        Channel channel = null;
        Set<String> queueNames = new HashSet<>();
        try {
            // 自动创建exchange、路由和消息队列，exchange、路由和消息队列的名称都是bizCode + FrameworkMqConstant.MQ_UNDERLINE + handleMsgServiceCode
            connection = connectionFactory.createConnection();
            channel = connection.createChannel(false);
            for (IConsumer<? extends BaseNotifyDto> consumer : consumers) {
                QueueBo queueBo = consumer.getQueueBo();
                if (VerifyUtil.isEmpty(queueBo)) {
                    log.error("queue biz object is null");
                    throw new CheckedException(SystemErrorCode.MQ_ERROR, "mq config error");
                }
                String handleMsgServiceCode = queueBo.getHandleMsgServiceCode();
                String bizCode = queueBo.getBizCode();
                if (VerifyUtil.hasEmpty(handleMsgServiceCode, bizCode)) {
                    log.error("service code or biz code is empty, service code: {}, biz code: {}", handleMsgServiceCode, bizCode);
                    throw new CheckedException(SystemErrorCode.MQ_ERROR, "mq config error");
                }
                String exchange = bizCode + FrameworkMqConstant.MQ_COMBINATION_SPLIT_LINE + handleMsgServiceCode;
                String routingKey = bizCode + FrameworkMqConstant.MQ_COMBINATION_SPLIT_LINE + handleMsgServiceCode;
                routingKeyAndConsumerMap.put(routingKey, consumer);
                channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT, true);
                channel.queueDeclare(bizCode, true, false, false, null);
                channel.queueBind(bizCode, exchange, routingKey);
                queueNames.add(bizCode);
            }
        } catch (Exception e) {
            log.error("mq config error.", e);
            throw new CheckedException(SystemErrorCode.MQ_ERROR, "mq config error");
        } finally {
            IoUtil.close(channel, connection);
        }

        // 注册消费者监听器
        SimpleRabbitListenerEndpoint simpleRabbitListenerEndpoint = new SimpleRabbitListenerEndpoint();
        simpleRabbitListenerEndpoint.setId("customization_common_message_listener_endpoint");
        simpleRabbitListenerEndpoint.setQueueNames(queueNames.toArray(new String[0]));
        simpleRabbitListenerEndpoint.setMessageListener(new MessageQueueConsumerListener(routingKeyAndConsumerMap,
                frameworkCoreConfigProperties, mqFrameworkModuleService));
        rabbitListenerEndpointRegistry.registerListenerContainer(simpleRabbitListenerEndpoint, factory, true);
    }
}
